/*
 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 */

/*
 *
 *
 *
 *
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;

/**
 * An unbounded {@linkplain BlockingQueue blocking queue} of
 * {@code Delayed} elements, in which an element can only be taken
 * when its delay has expired.  The <em>head</em> of the queue is that
 * {@code Delayed} element whose delay expired furthest in the
 * past.  If no delay has expired there is no head and {@code poll}
 * will return {@code null}. Expiration occurs when an element's
 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
 * than or equal to zero.  Even though unexpired elements cannot be
 * removed using {@code take} or {@code poll}, they are otherwise
 * treated as normal elements. For example, the {@code size} method
 * returns the count of both expired and unexpired elements.
 * This queue does not permit null elements.
 *
 * <p>This class and its iterator implement all of the
 * <em>optional</em> methods of the {@link Collection} and {@link
 * Iterator} interfaces.  The Iterator provided in method {@link
 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
 * the DelayQueue in any particular order.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @param <E> the type of elements held in this collection
 * @author Doug Lea
 * @since 1.5
 */
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

  private final transient ReentrantLock lock = new ReentrantLock();
  private final PriorityQueue<E> q = new PriorityQueue<E>();

  /**
   * Thread designated to wait for the element at the head of
   * the queue.  This variant of the Leader-Follower pattern
   * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
   * minimize unnecessary timed waiting.  When a thread becomes
   * the leader, it waits only for the next delay to elapse, but
   * other threads await indefinitely.  The leader thread must
   * signal some other thread before returning from take() or
   * poll(...), unless some other thread becomes leader in the
   * interim.  Whenever the head of the queue is replaced with
   * an element with an earlier expiration time, the leader
   * field is invalidated by being reset to null, and some
   * waiting thread, but not necessarily the current leader, is
   * signalled.  So waiting threads must be prepared to acquire
   * and lose leadership while waiting.
   */
  private Thread leader = null;

  /**
   * Condition signalled when a newer element becomes available
   * at the head of the queue or a new thread may need to
   * become leader.
   */
  private final Condition available = lock.newCondition();

  /**
   * Creates a new {@code DelayQueue} that is initially empty.
   */
  public DelayQueue() {
  }

  /**
   * Creates a {@code DelayQueue} initially containing the elements of the
   * given collection of {@link Delayed} instances.
   *
   * @param c the collection of elements to initially contain
   * @throws NullPointerException if the specified collection or any of its elements are null
   */
  public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
  }

  /**
   * Inserts the specified element into this delay queue.
   *
   * @param e the element to add
   * @return {@code true} (as specified by {@link Collection#add})
   * @throws NullPointerException if the specified element is null
   */
  public boolean add(E e) {
    return offer(e);
  }

  /**
   * Inserts the specified element into this delay queue.
   *
   * @param e the element to add
   * @return {@code true}
   * @throws NullPointerException if the specified element is null
   */
  public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      q.offer(e);
      if (q.peek() == e) {
        leader = null;
        available.signal();
      }
      return true;
    } finally {
      lock.unlock();
    }
  }

  /**
   * Inserts the specified element into this delay queue. As the queue is
   * unbounded this method will never block.
   *
   * @param e the element to add
   * @throws NullPointerException {@inheritDoc}
   */
  public void put(E e) {
    offer(e);
  }

  /**
   * Inserts the specified element into this delay queue. As the queue is
   * unbounded this method will never block.
   *
   * @param e the element to add
   * @param timeout This parameter is ignored as the method never blocks
   * @param unit This parameter is ignored as the method never blocks
   * @return {@code true}
   * @throws NullPointerException {@inheritDoc}
   */
  public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
  }

  /**
   * Retrieves and removes the head of this queue, or returns {@code null}
   * if this queue has no elements with an expired delay.
   *
   * @return the head of this queue, or {@code null} if this queue has no elements with an expired
   * delay
   */
  public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      E first = q.peek();
      if (first == null || first.getDelay(NANOSECONDS) > 0) {
        return null;
      } else {
        return q.poll();
      }
    } finally {
      lock.unlock();
    }
  }

  /**
   * Retrieves and removes the head of this queue, waiting if necessary
   * until an element with an expired delay is available on this queue.
   *
   * @return the head of this queue
   * @throws InterruptedException {@inheritDoc}
   */
  public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      for (; ; ) {
        E first = q.peek();
        if (first == null) {
          available.await();
        } else {
          long delay = first.getDelay(NANOSECONDS);
          if (delay <= 0) {
            return q.poll();
          }
          first = null; // don't retain ref while waiting
          if (leader != null) {
            available.await();
          } else {
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
              available.awaitNanos(delay);
            } finally {
              if (leader == thisThread) {
                leader = null;
              }
            }
          }
        }
      }
    } finally {
      if (leader == null && q.peek() != null) {
        available.signal();
      }
      lock.unlock();
    }
  }

  /**
   * Retrieves and removes the head of this queue, waiting if necessary
   * until an element with an expired delay is available on this queue,
   * or the specified wait time expires.
   *
   * @return the head of this queue, or {@code null} if the specified waiting time elapses before an
   * element with an expired delay becomes available
   * @throws InterruptedException {@inheritDoc}
   */
  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      for (; ; ) {
        E first = q.peek();
        if (first == null) {
          if (nanos <= 0) {
            return null;
          } else {
            nanos = available.awaitNanos(nanos);
          }
        } else {
          long delay = first.getDelay(NANOSECONDS);
          if (delay <= 0) {
            return q.poll();
          }
          if (nanos <= 0) {
            return null;
          }
          first = null; // don't retain ref while waiting
          if (nanos < delay || leader != null) {
            nanos = available.awaitNanos(nanos);
          } else {
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
              long timeLeft = available.awaitNanos(delay);
              nanos -= delay - timeLeft;
            } finally {
              if (leader == thisThread) {
                leader = null;
              }
            }
          }
        }
      }
    } finally {
      if (leader == null && q.peek() != null) {
        available.signal();
      }
      lock.unlock();
    }
  }

  /**
   * Retrieves, but does not remove, the head of this queue, or
   * returns {@code null} if this queue is empty.  Unlike
   * {@code poll}, if no expired elements are available in the queue,
   * this method returns the element that will expire next,
   * if one exists.
   *
   * @return the head of this queue, or {@code null} if this queue is empty
   */
  public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      return q.peek();
    } finally {
      lock.unlock();
    }
  }

  public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      return q.size();
    } finally {
      lock.unlock();
    }
  }

  /**
   * Returns first element only if it is expired.
   * Used only by drainTo.  Call only when holding lock.
   */
  private E peekExpired() {
    // assert lock.isHeldByCurrentThread();
    E first = q.peek();
    return (first == null || first.getDelay(NANOSECONDS) > 0) ?
        null : first;
  }

  /**
   * @throws UnsupportedOperationException {@inheritDoc}
   * @throws ClassCastException {@inheritDoc}
   * @throws NullPointerException {@inheritDoc}
   * @throws IllegalArgumentException {@inheritDoc}
   */
  public int drainTo(Collection<? super E> c) {
    if (c == null) {
      throw new NullPointerException();
    }
    if (c == this) {
      throw new IllegalArgumentException();
    }
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      int n = 0;
      for (E e; (e = peekExpired()) != null; ) {
        c.add(e);       // In this order, in case add() throws.
        q.poll();
        ++n;
      }
      return n;
    } finally {
      lock.unlock();
    }
  }

  /**
   * @throws UnsupportedOperationException {@inheritDoc}
   * @throws ClassCastException {@inheritDoc}
   * @throws NullPointerException {@inheritDoc}
   * @throws IllegalArgumentException {@inheritDoc}
   */
  public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null) {
      throw new NullPointerException();
    }
    if (c == this) {
      throw new IllegalArgumentException();
    }
    if (maxElements <= 0) {
      return 0;
    }
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      int n = 0;
      for (E e; n < maxElements && (e = peekExpired()) != null; ) {
        c.add(e);       // In this order, in case add() throws.
        q.poll();
        ++n;
      }
      return n;
    } finally {
      lock.unlock();
    }
  }

  /**
   * Atomically removes all of the elements from this delay queue.
   * The queue will be empty after this call returns.
   * Elements with an unexpired delay are not waited for; they are
   * simply discarded from the queue.
   */
  public void clear() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      q.clear();
    } finally {
      lock.unlock();
    }
  }

  /**
   * Always returns {@code Integer.MAX_VALUE} because
   * a {@code DelayQueue} is not capacity constrained.
   *
   * @return {@code Integer.MAX_VALUE}
   */
  public int remainingCapacity() {
    return Integer.MAX_VALUE;
  }

  /**
   * Returns an array containing all of the elements in this queue.
   * The returned array elements are in no particular order.
   *
   * <p>The returned array will be "safe" in that no references to it are
   * maintained by this queue.  (In other words, this method must allocate
   * a new array).  The caller is thus free to modify the returned array.
   *
   * <p>This method acts as bridge between array-based and collection-based
   * APIs.
   *
   * @return an array containing all of the elements in this queue
   */
  public Object[] toArray() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      return q.toArray();
    } finally {
      lock.unlock();
    }
  }

  /**
   * Returns an array containing all of the elements in this queue; the
   * runtime type of the returned array is that of the specified array.
   * The returned array elements are in no particular order.
   * If the queue fits in the specified array, it is returned therein.
   * Otherwise, a new array is allocated with the runtime type of the
   * specified array and the size of this queue.
   *
   * <p>If this queue fits in the specified array with room to spare
   * (i.e., the array has more elements than this queue), the element in
   * the array immediately following the end of the queue is set to
   * {@code null}.
   *
   * <p>Like the {@link #toArray()} method, this method acts as bridge between
   * array-based and collection-based APIs.  Further, this method allows
   * precise control over the runtime type of the output array, and may,
   * under certain circumstances, be used to save allocation costs.
   *
   * <p>The following code can be used to dump a delay queue into a newly
   * allocated array of {@code Delayed}:
   *
   * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
   *
   * Note that {@code toArray(new Object[0])} is identical in function to
   * {@code toArray()}.
   *
   * @param a the array into which the elements of the queue are to be stored, if it is big enough;
   * otherwise, a new array of the same runtime type is allocated for this purpose
   * @return an array containing all of the elements in this queue
   * @throws ArrayStoreException if the runtime type of the specified array is not a supertype of
   * the runtime type of every element in this queue
   * @throws NullPointerException if the specified array is null
   */
  public <T> T[] toArray(T[] a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      return q.toArray(a);
    } finally {
      lock.unlock();
    }
  }

  /**
   * Removes a single instance of the specified element from this
   * queue, if it is present, whether or not it has expired.
   */
  public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      return q.remove(o);
    } finally {
      lock.unlock();
    }
  }

  /**
   * Identity-based version for use in Itr.remove
   */
  void removeEQ(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
        if (o == it.next()) {
          it.remove();
          break;
        }
      }
    } finally {
      lock.unlock();
    }
  }

  /**
   * Returns an iterator over all the elements (both expired and
   * unexpired) in this queue. The iterator does not return the
   * elements in any particular order.
   *
   * <p>The returned iterator is
   * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
   *
   * @return an iterator over the elements in this queue
   */
  public Iterator<E> iterator() {
    return new Itr(toArray());
  }

  /**
   * Snapshot iterator that works off copy of underlying q array.
   */
  private class Itr implements Iterator<E> {

    final Object[] array; // Array of all elements
    int cursor;           // index of next element to return
    int lastRet;          // index of last element, or -1 if no such

    Itr(Object[] array) {
      lastRet = -1;
      this.array = array;
    }

    public boolean hasNext() {
      return cursor < array.length;
    }

    @SuppressWarnings("unchecked")
    public E next() {
      if (cursor >= array.length) {
        throw new NoSuchElementException();
      }
      lastRet = cursor;
      return (E) array[cursor++];
    }

    public void remove() {
      if (lastRet < 0) {
        throw new IllegalStateException();
      }
      removeEQ(array[lastRet]);
      lastRet = -1;
    }
  }

}
